/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.coders;



import com.bff.gaia.unified.sdk.util.BufferedElementCountingOutputStream;

import com.bff.gaia.unified.sdk.util.VarInt;

import com.bff.gaia.unified.sdk.util.common.ElementByteSizeObservableIterable;

import com.bff.gaia.unified.sdk.util.common.ElementByteSizeObserver;



import java.io.*;

import java.util.*;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;



/**

 * An abstract base class with functionality for assembling a {@link Coder} for a class that

 * implements {@code Iterable}.

 *

 * <p>To complete a subclass, implement the {@link #decodeToIterable} method. This superclass will

 * decode the elements in the input stream into a {@link List} and then pass them to that method to

 * be converted into the appropriate iterable type. Note that this means the input iterables must

 * fit into memory.

 *

 * <p>The format of this coder is as follows:

 *

 * <ul>

 *   <li>If the input {@link Iterable} has a known and finite size, then the size is written to the

 *       output stream in big endian format, followed by all of the encoded elements.

 *   <li>If the input {@link Iterable} is not known to have a finite size, then each element of the

 *       input is preceded by {@code true} encoded as a byte (indicating "more data") followed by

 *       the encoded element, and terminated by {@code false} encoded as a byte.

 * </ul>

 *

 * @param <T> the type of the elements of the {@code Iterable}s being transcoded

 * @param <IterableT> the type of the Iterables being transcoded

 */

public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>

    extends StructuredCoder<IterableT> {

  public Coder<T> getElemCoder() {

    return elementCoder;

  }



  /**

   * Builds an instance of {@code IterableT}, this coder's associated {@link Iterable}-like subtype,

   * from a list of decoded elements.

   */

  protected abstract IterableT decodeToIterable(List<T> decodedElements);



  /////////////////////////////////////////////////////////////////////////////

  // Internal operations below here.



  private final Coder<T> elementCoder;

  private final String iterableName;



  protected IterableLikeCoder(Coder<T> elementCoder, String iterableName) {

    checkArgument(elementCoder != null, "element Coder for IterableLikeCoder must not be null");

    checkArgument(iterableName != null, "iterable name for IterableLikeCoder must not be null");

    this.elementCoder = elementCoder;

    this.iterableName = iterableName;

  }



  @Override

  public void encode(IterableT iterable, OutputStream outStream)

      throws IOException, CoderException {

    if (iterable == null) {

      throw new CoderException("cannot encode a null " + iterableName);

    }

    DataOutputStream dataOutStream = new DataOutputStream(outStream);

    if (iterable instanceof Collection) {

      // We can know the size of the Iterable.  Use an encoding with a

      // leading size field, followed by that many elements.

      Collection<T> collection = (Collection<T>) iterable;

      dataOutStream.writeInt(collection.size());

      for (T elem : collection) {

        elementCoder.encode(elem, dataOutStream);

      }

    } else {

      // We don't know the size without traversing it so use a fixed size buffer

      // and encode as many elements as possible into it before outputting the size followed

      // by the elements.

      dataOutStream.writeInt(-1);

      BufferedElementCountingOutputStream countingOutputStream =

          new BufferedElementCountingOutputStream(dataOutStream);

      for (T elem : iterable) {

        countingOutputStream.markElementStart();

        elementCoder.encode(elem, countingOutputStream);

      }

      countingOutputStream.finish();

    }

    // Make sure all our output gets pushed to the underlying outStream.

    dataOutStream.flush();

  }



  @Override

  public IterableT decode(InputStream inStream) throws IOException, CoderException {

    DataInputStream dataInStream = new DataInputStream(inStream);

    int size = dataInStream.readInt();

    if (size >= 0) {

      List<T> elements = new ArrayList<>(size);

      for (int i = 0; i < size; i++) {

        elements.add(elementCoder.decode(dataInStream));

      }

      return decodeToIterable(elements);

    }

    List<T> elements = new ArrayList<>();

    // We don't know the size a priori.  Check if we're done with

    // each block of elements.

    long count = VarInt.decodeLong(dataInStream);

    while (count > 0L) {

      elements.add(elementCoder.decode(dataInStream));

      --count;

      if (count == 0L) {

        count = VarInt.decodeLong(dataInStream);

      }

    }

    return decodeToIterable(elements);

  }



  @Override

  public List<? extends Coder<?>> getCoderArguments() {

    return Arrays.asList(elementCoder);

  }



  /**

   * {@inheritDoc}

   *

   * @throws NonDeterministicException always. Encoding is not deterministic for the general {@link

   *     Iterable} case, as it depends upon the type of iterable. This may allow two objects to

   *     compare as equal while the encoding differs.

   */

  @Override

  public void verifyDeterministic() throws NonDeterministicException {

    throw new NonDeterministicException(

        this, "IterableLikeCoder can not guarantee deterministic ordering.");

  }



  /**

   * {@inheritDoc}

   *

   * @return {@code true} if the iterable is of a known class that supports lazy counting of byte

   *     size, since that requires minimal extra computation.

   */

  @Override

  public boolean isRegisterByteSizeObserverCheap(IterableT iterable) {

    return iterable instanceof ElementByteSizeObservableIterable;

  }



  @Override

  public void registerByteSizeObserver(IterableT iterable, ElementByteSizeObserver observer)

      throws Exception {

    if (iterable == null) {

      throw new CoderException("cannot encode a null Iterable");

    }



    if (iterable instanceof ElementByteSizeObservableIterable) {

      observer.setLazy();

      ElementByteSizeObservableIterable<?, ?> observableIterable =

          (ElementByteSizeObservableIterable<?, ?>) iterable;

      observableIterable.addObserver(

          new IteratorObserver(observer, iterable instanceof Collection));

    } else {

      if (iterable instanceof Collection) {

        // We can know the size of the Iterable.  Use an encoding with a

        // leading size field, followed by that many elements.

        Collection<T> collection = (Collection<T>) iterable;

        observer.update(4L);

        for (T elem : collection) {

          elementCoder.registerByteSizeObserver(elem, observer);

        }

      } else {

        // TODO: (BEAM-1537) Update to use an accurate count depending on size and count,

        // currently we are under estimating the size by up to 10 bytes per block of data since we

        // are not encoding the count prefix which occurs at most once per 64k of data and is upto

        // 10 bytes long. Since we include the total count we can upper bound the underestimate

        // to be 10 / 65536 ~= 0.0153% of the actual size.

        observer.update(4L);

        long count = 0;

        for (T elem : iterable) {

          count += 1;

          elementCoder.registerByteSizeObserver(elem, observer);

        }

        if (count > 0) {

          // Update the length based upon the number of counted elements, this helps

          // eliminate the case where all the elements are encoded in the first block and

          // it is quite short (e.g. Long.MAX_VALUE nulls encoded with VoidCoder).

          observer.update(VarInt.getLength(count));

        }

        // Update with the terminator byte.

        observer.update(1L);

      }

    }

  }



  /**

   * An observer that gets notified when an observable iterator returns a new value. This observer

   * just notifies an outerObserver about this event. Additionally, the outerObserver is notified

   * about additional separators that are transparently added by this coder.

   */

  private static class IteratorObserver implements Observer {

    private final ElementByteSizeObserver outerObserver;

    private final boolean countable;



    public IteratorObserver(ElementByteSizeObserver outerObserver, boolean countable) {

      this.outerObserver = outerObserver;

      this.countable = countable;



      if (countable) {

        // Additional 4 bytes are due to size.

        outerObserver.update(4L);

      } else {

        // Additional 5 bytes are due to size = -1 (4 bytes) and

        // hasNext = false (1 byte).

        outerObserver.update(5L);

      }

    }



    @Override

    public void update(Observable obs, Object obj) {

      if (!(obj instanceof Long)) {

        throw new AssertionError("unexpected parameter object");

      }



      if (countable) {

        outerObserver.update(obs, obj);

      } else {

        // Additional 1 byte is due to hasNext = true flag.

        outerObserver.update(obs, 1 + (long) obj);

      }

    }

  }

}